Adaptive (runtime, stats-based) conjunct reordering for FilterExec#22698
Adaptive (runtime, stats-based) conjunct reordering for FilterExec#22698adriangb wants to merge 12 commits into
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (5e71ea4) to 85bc5ef (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
a24471d to
4d7b733
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
…ilter evaluation (apache#22704) ## Which issue does this PR close? <!-- No tracking issue; this is a standalone benchmark contribution. --> This PR does not close an issue. It adds a benchmark suite to support work and discussion around predicate ordering in filter evaluation (e.g. the static reordering in apache#22343 and the runtime/statistics-based reordering explored in apache#22698). It deliberately benchmarks *no specific implementation* — see below. ## Rationale for this change Conjunctive (`AND`) filter evaluation in `FilterExec` is a left-deep `BinaryExpr(And)` chain, and the order conjuncts are evaluated in can change runtime by large factors: once a leading conjunct passes few enough rows the batch is physically compacted before the rest, so a cheap-and-selective predicate evaluated early saves later predicates work. Predicate ordering is therefore an active area (static heuristics, runtime/adaptive schemes, cost models). There is currently no benchmark suite that isolates the dimensions that drive this. Existing macro-benchmarks (TPC-H/DS, ClickBench) only incidentally exercise filter ordering, so they can't show *why* a change to ordering helped or hurt, or guard the order-insensitive case against regressions. ## What changes are included in this PR? A new SQL benchmark suite, `benchmarks/sql_benchmarks/predicate_eval`, built on the existing `.benchmark` template framework (no engine code, no new Rust). It sets no engine config of its own and measures DataFusion's built-in short-circuit by default; a system under test is toggled purely via its native `DATAFUSION_EXECUTION_*` env var (the bench harness builds its `SessionContext` with `SessionConfig::from_env`), so the same scenarios can characterise the baseline, a static heuristic, an adaptive scheme, or a cost model and be compared apples-to-apples. It is organised into 10 subgroups (select with `BENCH_SUBGROUP`), each varying one property of conjunctive filter evaluation while holding the others fixed: | Subgroup | What it varies (others held fixed) | |---|---| | `costsel` | cost and selectivity point in different directions (expensive predicate is the selective one) | | `cost` | per-predicate cost, at equal selectivity | | `selectivity` | per-predicate selectivity, at equal cost | | `cardinality` | conjunct count `k = 2/4/8/16` | | `width` | string-column width (`PRED_FILL` = 2 / 30 / 170 chars) | | `scale` | row count `5k / 100k / 5M / 50M` | | `neutral` | predicates are interchangeable (equal cost, none selective) — an order-insensitive control | | `correlation` | conditional vs marginal selectivity (independent / positively / anti-correlated) | | `drift` | selectivity that changes across the scan | | `nulls` | null density (two- vs three-valued predicate results) | Each query's comment notes the per-predicate cost/selectivity that the data generation hides from the SQL. Data is synthetic and generated inline by each subgroup's load SQL (no external files); `PRED_ROWS` sizes it and `PRED_FILL` sets string width. Wired into `bench.sh` (`./bench.sh run predicate_eval`) and documented in `benchmarks/sql_benchmarks/README.md`. The design was informed by surveying how Velox drives the analogous decision (it ranks by cycles-per-row-eliminated, `time / (rows_in - rows_out)`). > Note: the `scale` subgroup's `q52`/`q53` build 5M / 50M-row tables (the latter > ~9 GB); run a single point with `BENCH_QUERY` if that is too heavy. ## Are these changes tested? These are benchmark definitions, not engine code. Each `.benchmark` includes an `assert` that the generated table is non-empty, and every subgroup was run locally at small `PRED_ROWS` to confirm the suite parses, loads, asserts, and executes end-to-end. The queries are order-invariant (`SELECT count(*) ...`), so any predicate-ordering system can also be checked for correctness by diffing counts with the optimization on vs. off. ## Are there any user-facing changes? No. This only adds an opt-in benchmark suite and its documentation; no public API, engine behavior, or default configuration changes. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…posal Marginal per-conjunct statistics are blind to correlation: arrangements with very different costs can be statistically identical (the new predicate_eval correlation_q73 case has ~1.9x headroom invisible to any independence-based ranking), and the fused-vs-compact-once strategy difference is invisible to per-conjunct numbers entirely. Borrow the exploration idea from DuckDB's AdaptiveFilter (keep-or-revert timing of random swaps, src/execution/adaptive_filter.cpp): when a measuring window ends with nothing material to propose, occasionally put a random adjacent swap of the incumbent through the existing shared paired A/B trial instead of freezing. Each position carries a likelihood (halved when a swap there loses its trial, restored to 100 when one wins), so exploration of barren positions decays geometrically on top of the re-thaw backoff. The candidate bypasses the model gates by design — it exists because the model cannot see it — but adoption still requires the same measured, confidence-separated end-to-end win as any other proposal, which is a stronger keep-or-revert rule than DuckDB's strict mean comparison. On correlation_q73 (PR apache#22919) this captures 1.28x of the ~1.9x headroom within each 122-batch query (convergence needs two specific adjacent swaps; the rest needs cross-query persistence, cf. DuckDB's multi-file adaptive filter cache, left as future work). Tied micro-queries pay ~4-6% for the exploration trials they decline. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmark predicate_eval env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (be79329) to 0f8a121 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
…s no proposal" This reverts commit be79329. The exploration trials won the correlated case (predicate_eval q73, 1.28x) but dragged tied and guard-pattern filters through per-query trial churn: CI showed TPC-DS 4 faster / 12 slower and predicate_eval cost_q10/q11 at 1.15-1.16x. Two rounds of statistical hardening (sequential early stopping, a 5% adoption margin) moved the cost between shapes rather than removing it — exploration's wins amortize across queries, but its costs are paid per query, so it structurally needs cross-query persistence of the learned order (cf. DuckDB's multi-file adaptive filter cache) before it can be on by default. Parked, with the early-stopping/margin work, for a follow-up on top of order persistence. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmark predicate_eval env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
run benchmarks env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark predicate_eval env:
DATAFUSION_EXECUTION_ADAPTIVE_FILTER_REORDERING: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lift-selectivity-stats (796bca3) to 0f8a121 (merge-base) diff using: predicate_eval File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagepredicate_eval — base (merge-base)
predicate_eval — branch
File an issue against this benchmark runner |
…biter
Split the responsibility of coordinating "which arrangement should a
collection of filters execute with" from what an arrangement is. The
champion/challenger machinery FilterExec's adaptive reordering uses — the
epoch-broadcast champion, the shared paired A/B trial ledger, the
CI-separated verdict, the rejected-candidate memory — was entangled with
ordering-specific types (conjunct permutations, the evaluation strategy)
inside datafusion-physical-plan, but none of it depends on what is being
decided.
Move it into the adaptive substrate as AdaptiveArbiter<A>, generic over an
opaque arrangement type: FilterExec instantiates it with
{order, strategy}, and an adaptive parquet scan deciding filter *placement*
(row filter vs post-scan, i.e. when to late-materialize — cf. the apache#22144
experiment, which already tracks the same per-filter stats) can instantiate
it with a placement assignment while reusing the identical trial protocol
and the existing SelectivityStats/AdaptiveStatsRegistry. The substrate now
has three policy-free pieces: per-filter measurement, the concurrent
registry, and decision coordination; consumers own proposals, scheduling,
and arrangement semantics.
No behavior change: FilterExec's adaptive path is a mechanical rewire
(AdaptiveFilterShared = registry + arbiter), verified by the unchanged unit
suite and an unchanged perf-harness profile.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Which issue does this PR close?
(static cheap/expensive heuristic reordering). No single issue; happy to file
one if useful.
Rationale for this change
Predicate evaluation order matters: running a selective predicate first lets it
gate the work of the predicates after it. The static cheap/expensive heuristic
(#22343) sorts conjuncts into two cost classes and stable-sorts within each, so
it does nothing to order multiple similarly-expensive predicates; and
BinaryExpr'sANDshort-circuit only gates on a leftmost selective conjunct.So a conjunction of several expensive predicates whose selective member is not
written first is evaluated with every predicate scanning ~every row — and
neither mechanism fixes it.
This PR adds runtime, statistics-based conjunct reordering for
FilterExec:it measures each conjunct's selectivity and cost on the rows that actually reach
it and runs the ones that discard the most rows per unit of CPU time first.
Maximising discards-per-second is exactly minimising
cost_per_row / (1 - pass_rate),the classic optimal ordering key for independent conjuncts.
It is off by default (
datafusion.execution.adaptive_filter_reordering).What changes are included in this PR?
Split into four reviewable commits:
physical-expr-common: adaptive selectivity-stats substrate — apolicy-free
SelectivityStats(online selectivity + cost with Welfordmean/variance and confidence bounds) and a concurrent
AdaptiveStatsRegistry.Reusable by other consumers (e.g. a future parquet-scan integration).
common: config flagexecution.adaptive_filter_reordering(defaultfalse), plus regenerated
configs.md/information_schemalisting.physical-plan: adaptive conjunct reordering inFilterExec— astream-local evaluator that:
BinaryExpr's pre-selection) and measures each marginally;statistically certain (adjacent confidence intervals stop overlapping),
or after a small sample cap if the conjuncts are indistinguishable;
ANDin the learnedorder and evaluates it as an ordinary predicate (no measurement overhead,
inherits
BinaryExprpre-selection);drift, so steady-state overhead decays toward zero.
State is stream-local; the plan, results, and
EXPLAINare unchanged.Tests — an end-to-end
.sltasserting identical results/plan with theflag on and off.
Are these changes tested?
Yes:
SelectivityStats, registry) and theFilterExecevaluator (gating correctness, certainty-freeze, re-thaw backoff,drift adaptation).
adaptive_filter.slt: results andEXPLAINidentical with the flag on/off.Are there any user-facing changes?
One new config option,
datafusion.execution.adaptive_filter_reordering(experimental, default false). When enabled, the order in which a conjunctive
filter's predicates are evaluated may change at runtime; results are unchanged,
but observable side effects of fallible predicates could differ (predicates
containing volatile expressions are never reordered).